package com.example.dobs.demo.flink.realtime.report.utils;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.example.dobs.demo.flink.realtime.report.bean.Log;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import static com.example.dobs.demo.flink.realtime.report.utils.Constant.*;


public class LogParse {

    public static String nvl(String value, String defaultValue) {
        return value == null ? defaultValue : value;
    }

    /**
     * 解析request日志
     * 注意：params参数不在存时，丢掉该日志。
     * 输出内容包括：
     * (1) event_name
     * (2) rid
     * (3) pkg/product
     * (4) v_code/version_code
     * (5) pid
     * (6) nation
     * (7) dsp（请求）
     * (8) price（曝光）
     * (9) req (请求量)
     * (10) req_won（同bid_won）
     * (11) imp（曝光量）
     * (12) click（点击量）
     * (13) bid（有效竞价）
     * (14) parti_bid（参与竞价）
     * (15) bid_won（从竞价中胜出）
     * (16) pid_zh（中文名,需要匹配字典）
     * (17) adf_id（素材）
     * (17) tm（事件时间）
     *
     * @param eventJson 请求日志。json格式
     * @param out       输出集合
     */
    public static void requestLogParser(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out) {
        requestLogParser(eventJson,out,null);
    }
    public static void requestLogParser(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out,String logTimestamp) {
        String rid;
        String product;
        String version_code;
        String pid;
        String nation;
        //
        String dsp;
        String adfId;
        int req = 1;
        int req_won = 0;
        int bid = 0;
        int parti_bid = 0;
        int bid_won = 0;
        String timestamp;
        //
        rid = eventJson.getString("rid");//非空
        version_code = nvl(eventJson.getString("v_code"), blankString);

        timestamp = nvl(logTimestamp,eventJson.getString("tm"));//非空
        product = nvl(eventJson.getString("pkg"), blankString);

        JSONObject params = eventJson.getJSONObject("params");//非空
        pid = nvl(params.getString("pid"), blankString);
        nation = nvl(params.getString("nation"), blankString);
        /*
        resultDsp为空的情况，如下：
        1）没有request;
        2）有request，且dsp的sts都不为1
        */
        String resultDsp = nvl(params.getString("dspName"), blankString);
//        long price = Math.round(Float.parseFloat(nvl(params.getString("price"), "0")) * 1000000);
        JSONArray requestDetail = params.getJSONArray("r_dtl");
        if (requestDetail == null) return;
        for (Object obj : requestDetail) {
            JSONObject jsonObject = (JSONObject) obj;
            dsp = jsonObject.getString("dname");
            if (dsp == null) {//dsp不存在,则丢弃该数据
                continue;
            }
            String sts = jsonObject.getString("sts");
            parti_bid = ("2".equals(sts) || "8".equals(sts) || StringUtils.isBlank(sts)) ? 0 : 1;
            bid = "1".equals(sts) ? 1 : 0;
            bid_won = dsp.equals(resultDsp) ? 1 : 0;//bid success
            req_won = bid_won;

            adfId = nvl(jsonObject.getString("adf_id"), blankString);
            // bid


            Log.Event event = Log.Event.newBuilder()
                    .setEventName(requestEvent)
                    .setRid(rid)
                    .setProduct(product)
                    .setVCode(version_code)
                    .setPid(pid)
                    .setNation(nation)
                    .setDsp(dsp)
                    .setAdfId(adfId)
                    .setPrice(0)//请求不计收入
//                    .setPrice(resultDsp.equals(dsp) ? price : 0)
                    .setReq(req)
                    .setRequestWon(req_won)
                    .setBid(bid)
                    .setBidWon(bid_won)
                    .setPartBid(parti_bid)
                    .setImp(0)
                    .setClick(0)
                    .setTs(Long.parseLong(timestamp))
                    .build();
            out.collect(new Tuple2<String, Log.Event>("request", event));
        }
    }

    /**
     * 解析曝光和点击日志
     * 注意：params参数不在存时，不丢掉该日志。
     *
     * @param eventJson 曝光或点击日志。json格式
     * @param out       输出集合
     */
    public static void impressionLogParser(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out) {
        impressionLogParser2(eventJson,out,true,null);
    }

    public static void impressionLogParser(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out,String logTimeStamp) {
        impressionLogParser2(eventJson,out,true,logTimeStamp);
    }
    public static void clickLogParser(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out) {
        impressionLogParser2(eventJson,out,false,null);
    }
    public static void clickLogParser(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out,String logTimeStamp) {
        impressionLogParser2(eventJson,out,false,logTimeStamp);
    }
    public static void impressionLogParser2(JSONObject eventJson, Collector<Tuple2<String, Log.Event>> out, boolean isImpression,String logTimeStamp) {
        String rid;
        String pid = blankString;
        String nation = blankString;
        String dsp = blankString;
        String v_code;
        String product;
//        boolean isImpression = "AD_AdxShowed".equals(eventJson.getString("event_name"));
        int imp = isImpression ? 1 : 0;
        int click = isImpression ? 0 : 1;
        String timestamp;
        long price = 0;
        rid = eventJson.getString("rid");//必须非空
        v_code = nvl(eventJson.getString("v_code"), blankString);
        timestamp = nvl(logTimeStamp,eventJson.getString("tm"));//必须非空
        product = nvl(eventJson.getString("pkg"), blankString);

        JSONObject params = eventJson.getJSONObject("params");

        if (params != null) {
            price = isImpression ? Math.round(Float.parseFloat(nvl(params.getString("price"), "0")) * 1000000) : 0;//必须非空
            pid = nvl(params.getString("pid"), blankString);
            dsp = nvl(params.getString("dspName"), blankString);
            nation = nvl(params.getString("nation"), blankString);
        }
        Log.Event event = Log.Event.newBuilder()
                .setEventName(impressionEvent)
                .setRid(rid)
                .setProduct(product)
                .setVCode(v_code)
                .setPid(pid)
                .setNation(nation)
                .setDsp(dsp)
                .setAdfId("")
                .setPrice(price)
                .setReq(0)
                .setRequestWon(0)
                .setBid(0)
                .setBidWon(0)
                .setPartBid(0)
                .setImp(imp)
                .setClick(click)
                .setTs(Long.parseLong(timestamp))
                .build();
        out.collect(new Tuple2<>("impression", event));
    }

}
